{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "# Creating, Evaluating, and Deploying a Recommendation System"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "## Introduction\n",
    "In this notebook, we'll demonstrate data engineering and data science work flow with an e2e sample. The scenario is to build a recommender for online book recommendation.\n",
    "\n",
    "\n",
    "There are different types of recommendation algorithms, we'll use a model based collaborative filtering algorithm named Alternating Least Squares (ALS) matrix factorization in this notebook.\n",
    "<img src=\"https://negustpublicblob.blob.core.windows.net/public/recommenders.png\" style=\"width:600px;\"/>\n",
    "\n",
    "ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. \n",
    "The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is \n",
    "then held constant while solving for the other factor matrix.\n",
    "\n",
    "<img src=\"https://negustpublicblob.blob.core.windows.net/public/Matrixfactor.svg\" style=\"width:600px;\"/>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "## Step 1: Load the Data\n",
    "\n",
    "```\n",
    "+--- Book-Recommendation-Dataset\n",
    "|   +--- Books.csv\n",
    "|   +--- Ratings.csv\n",
    "|   +--- Users.csv\n",
    "```\n",
    "- Books.csv\n",
    "\n",
    "|ISBN|Book-Title|Book-Author|Year-Of-Publication|Publisher|Image-URL-S|Image-URL-M|Image-URL-L|\n",
    "|---|---|---|---|---|---|---|---|\n",
    "|0195153448|Classical Mythology|Mark P. O. Morford|2002|Oxford University Press|http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg|http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg|http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg|\n",
    "|0002005018|Clara Callan|Richard Bruce Wright|2001|HarperFlamingo Canada|http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg|http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg|http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg|\n",
    "\n",
    "- Ratings.csv\n",
    "\n",
    "|User-ID|ISBN|Book-Rating|\n",
    "|---|---|---|\n",
    "|276725|034545104X|0|\n",
    "|276726|0155061224|5|\n",
    "\n",
    "- Users.csv\n",
    "\n",
    "|User-ID|Location|Age|\n",
    "|---|---|---|\n",
    "|1|\"nyc| new york| usa\"||\n",
    "|2|\"stockton| california| usa\"|18.0|"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "**By defining below parameters, we can apply this notebook on different datasets easily.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually\n",
    "\n",
    "USER_ID_COL = \"User-ID\"  # must not be '_user_id' for this notebook to run successfully\n",
    "ITEM_ID_COL = \"ISBN\"  # must not be '_item_id' for this notebook to run successfully\n",
    "ITEM_INFO_COL = (\n",
    "    \"Book-Title\"  # must not be '_item_info' for this notebook to run successfully\n",
    ")\n",
    "RATING_COL = (\n",
    "    \"Book-Rating\"  # must not be '_rating' for this notebook to run successfully\n",
    ")\n",
    "IS_SAMPLE = True  # if True, use only <SAMPLE_ROWS> rows of data for training, otherwise use all data\n",
    "SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training\n",
    "\n",
    "DATA_FOLDER = \"Files/book-recommendation/\"  # folder containing the dataset\n",
    "ITEMS_FILE = \"Books.csv\"  # file containing the items information\n",
    "USERS_FILE = \"Users.csv\"  # file containing the users information\n",
    "RATINGS_FILE = \"Ratings.csv\"  # file containing the ratings information\n",
    "\n",
    "EXPERIMENT_NAME = \"aisample-recommendation\"  # mlflow experiment name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Download dataset and Upload to lakehouse\n",
    "\n",
    "**Please add a lakehouse to the notebook before running it.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "if not IS_CUSTOM_DATA:\n",
    "    # Download demo data files into lakehouse if not exist\n",
    "    import os, requests\n",
    "\n",
    "    remote_url = \"https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset\"\n",
    "    file_list = [\"Books.csv\", \"Ratings.csv\", \"Users.csv\"]\n",
    "    download_path = f\"/lakehouse/default/{DATA_FOLDER}/raw\"\n",
    "\n",
    "    if not os.path.exists(\"/lakehouse/default\"):\n",
    "        raise FileNotFoundError(\"Default lakehouse not found, please add a lakehouse.\")\n",
    "    os.makedirs(download_path, exist_ok=True)\n",
    "    for fname in file_list:\n",
    "        if not os.path.exists(f\"{download_path}/{fname}\"):\n",
    "            r = requests.get(f\"{remote_url}/{fname}\", timeout=30)\n",
    "            with open(f\"{download_path}/{fname}\", \"wb\") as f:\n",
    "                f.write(r.content)\n",
    "    print(\"Downloaded demo data files into lakehouse.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# to record the notebook running time\n",
    "import time\n",
    "\n",
    "ts = time.time()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Read data from lakehouse"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "advisor": {
     "adviceMetadata": "{\"artifactId\":\"d0fa4e54-1ffe-4239-9441-a0432a4693ed\",\"activityId\":\"4c706a11-e7cd-47d0-9419-6e506cf48447\",\"applicationId\":\"application_1660013541887_0001\",\"jobGroupId\":\"8\",\"advices\":{\"error\":1}}"
    },
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "df_items = (\n",
    "    spark.read.option(\"header\", True)\n",
    "    .option(\"inferSchema\", True)\n",
    "    .csv(f\"{DATA_FOLDER}/raw/{ITEMS_FILE}\")\n",
    "    .cache()\n",
    ")\n",
    "\n",
    "df_ratings = (\n",
    "    spark.read.option(\"header\", True)\n",
    "    .option(\"inferSchema\", True)\n",
    "    .csv(f\"{DATA_FOLDER}/raw/{RATINGS_FILE}\")\n",
    "    .cache()\n",
    ")\n",
    "\n",
    "df_users = (\n",
    "    spark.read.option(\"header\", True)\n",
    "    .option(\"inferSchema\", True)\n",
    "    .csv(f\"{DATA_FOLDER}/raw/{USERS_FILE}\")\n",
    "    .cache()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "## Step 2. Exploratory Data Analysis\n",
    "\n",
    "### Display Raw Data\n",
    "\n",
    "We can explore the raw data with `display`, do some basic statistcs or even show chart views."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "import pyspark.sql.functions as F\n",
    "from pyspark.ml.feature import StringIndexer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_items, summary=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "Add `_item_id` column for later usage. `_item_id` must be integer for recommendation models. Here we leverage `StringIndexer` to transform `ITEM_ID_COL` to indices."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "df_items = (\n",
    "    StringIndexer(inputCol=ITEM_ID_COL, outputCol=\"_item_id\")\n",
    "    .setHandleInvalid(\"skip\")\n",
    "    .fit(df_items)\n",
    "    .transform(df_items)\n",
    "    .withColumn(\"_item_id\", F.col(\"_item_id\").cast(\"int\"))\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Display and check if the `_item_id` increases monotonically and successively as we expected."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_items.sort(F.col(\"_item_id\").desc()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_users, summary=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "There is a missing value in `User-ID`, we'll drop the row with missing value. It doesn't matter if customized dataset doesn't have missing value."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "df_users = df_users.dropna(subset=(USER_ID_COL))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_users, summary=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "Add `_user_id` column for later usage. `_user_id` must be integer for recommendation models. Here we leverage `StringIndexer` to transform `USER_ID_COL` to indices.\n",
    "\n",
    "In this book dataset, we already have `User-ID` column which is integer. But we still add `_user_id` column \n",
    "for compatibility to different datasets, making this notebook more robust."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "df_users = (\n",
    "    StringIndexer(inputCol=USER_ID_COL, outputCol=\"_user_id\")\n",
    "    .setHandleInvalid(\"skip\")\n",
    "    .fit(df_users)\n",
    "    .transform(df_users)\n",
    "    .withColumn(\"_user_id\", F.col(\"_user_id\").cast(\"int\"))\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_users.sort(F.col(\"_user_id\").desc()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "display(df_ratings, summary=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the distinct ratings and save them to a list `ratings` for later use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]\n",
    "print(ratings)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Merge data\n",
    "Merge raw dataframes into one dataframe for more comprehensive analysis."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_all = df_ratings.join(df_users, USER_ID_COL, \"inner\").join(\n",
    "    df_items, ITEM_ID_COL, \"inner\"\n",
    ")\n",
    "df_all_columns = [\n",
    "    c for c in df_all.columns if c not in [\"_user_id\", \"_item_id\", RATING_COL]\n",
    "]\n",
    "\n",
    "# with this step, we can reorder the columns to make sure _user_id, _item_id and RATING_COL are the first three columns\n",
    "df_all = (\n",
    "    df_all.select([\"_user_id\", \"_item_id\", RATING_COL] + df_all_columns)\n",
    "    .withColumn(\"id\", F.monotonically_increasing_id())\n",
    "    .cache()\n",
    ")\n",
    "\n",
    "display(df_all)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "print(f\"Total Users: {df_users.select('_user_id').distinct().count()}\")\n",
    "print(f\"Total Items: {df_items.select('_item_id').distinct().count()}\")\n",
    "print(f\"Total User-Item Interactions: {df_all.count()}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Compute and Plot most popular items"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# import libs\n",
    "\n",
    "import pandas as pd  # dataframes\n",
    "import matplotlib.pyplot as plt  # plotting\n",
    "import seaborn as sns  # plotting\n",
    "\n",
    "color = sns.color_palette()  # adjusting plotting style"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# compute top popular products\n",
    "df_top_items = (\n",
    "    df_all.groupby([\"_item_id\"])\n",
    "    .count()\n",
    "    .join(df_items, \"_item_id\", \"inner\")\n",
    "    .sort([\"count\"], ascending=[0])\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# find top <topn> popular items\n",
    "topn = 10\n",
    "pd_top_items = df_top_items.limit(topn).toPandas()\n",
    "pd_top_items.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "Top `<topn>` popular items, which can be used for **recommendation section \"Popular\"** or **\"Top purchased\"**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Plot top <topn> items\n",
    "f, ax = plt.subplots(figsize=(12, 10))\n",
    "plt.xticks(rotation=\"vertical\")\n",
    "sns.barplot(x=ITEM_INFO_COL, y=\"count\", data=pd_top_items)\n",
    "plt.ylabel(\"Number of Ratings for the Item\")\n",
    "plt.xlabel(\"Item Name\")\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "## Step 3. Model development and deploy\n",
    "\n",
    "So far we have explored the dataset, added unique ids to our users and items, and plotted top items. Next, we'll train an Alternating Least Squares (ALS) recommender to give users personalized recommendations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Prepare training and testing data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "if IS_SAMPLE:\n",
    "    # need to sort by '_user_id' before limit, so as to make sure ALS work normally.\n",
    "    # if train and test dataset have no common _user_id, ALS will fail\n",
    "    df_all = df_all.sort(\"_user_id\").limit(SAMPLE_ROWS)\n",
    "\n",
    "# cast column into the correct types\n",
    "df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast(\"float\"))\n",
    "\n",
    "# By using fraction between 0 to 1, it returns the approximate number of the fraction of the dataset.\n",
    "# fraction = 0.8 means 80% of the dataset.\n",
    "# Note that rating = 0 means the user didn't rate the item, so we can't use it for training.\n",
    "# With below steps, we'll select 80% the dataset with rating > 0 as training dataset.\n",
    "fractions_train = {0: 0}\n",
    "fractions_test = {0: 0}\n",
    "for i in ratings:\n",
    "    if i == 0:\n",
    "        continue\n",
    "    fractions_train[i] = 0.8\n",
    "    fractions_test[i] = 1\n",
    "train = df_all.sampleBy(RATING_COL, fractions=fractions_train)\n",
    "\n",
    "# join with leftanti means not in, thus below step will select all rows from df_all\n",
    "# with rating > 0 and not in train dataset, i.e., the left 20% of the dataset as test dataset.\n",
    "test = df_all.join(train, on=\"id\", how=\"leftanti\").sampleBy(\n",
    "    RATING_COL, fractions=fractions_test\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# compute the sparsity of the dataset\n",
    "def get_mat_sparsity(ratings):\n",
    "    # Count the total number of ratings in the dataset\n",
    "    count_nonzero = ratings.select(RATING_COL).count()\n",
    "    print(f\"Number of rows: {count_nonzero}\")\n",
    "\n",
    "    # Count the number of distinct user_id and distinct product_id\n",
    "    total_elements = (\n",
    "        ratings.select(\"_user_id\").distinct().count()\n",
    "        * ratings.select(\"_item_id\").distinct().count()\n",
    "    )\n",
    "\n",
    "    # Divide the numerator by the denominator\n",
    "    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100\n",
    "    print(\"The ratings dataframe is \", \"%.4f\" % sparsity + \"% sparse.\")\n",
    "\n",
    "\n",
    "get_mat_sparsity(df_all)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# check the id range\n",
    "# ALS only supports values in Integer range\n",
    "print(f\"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}\")\n",
    "print(f\"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Define the Model\n",
    "\n",
    "With our data in place, we can now define the recommendation model. We'll apply Alternating Least Squares (ALS) \n",
    "model in this notebook. \n",
    "\n",
    "Spark ML provides a convenient API in building the model. However, the model is not good enough at \n",
    "handling problems like data sparsity and cold start. We'll combine cross validation and auto hyperparameter tuning \n",
    "to improve the performance of the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Specify training parameters\n",
    "num_epochs = 1\n",
    "rank_size_list = [64, 128]\n",
    "reg_param_list = [0.01, 0.1]\n",
    "model_tuning_method = \"TrainValidationSplit\"  # TrainValidationSplit or CrossValidator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.ml.recommendation import ALS\n",
    "from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit\n",
    "\n",
    "# Build the recommendation model using ALS on the training data\n",
    "# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics\n",
    "als = ALS(\n",
    "    maxIter=num_epochs,\n",
    "    userCol=\"_user_id\",\n",
    "    itemCol=\"_item_id\",\n",
    "    ratingCol=RATING_COL,\n",
    "    coldStartStrategy=\"drop\",\n",
    "    implicitPrefs=False,\n",
    "    nonnegative=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Model training and hyper-tunning"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Define tuning parameters\n",
    "param_grid = (\n",
    "    ParamGridBuilder()\n",
    "    .addGrid(als.rank, rank_size_list)\n",
    "    .addGrid(als.regParam, reg_param_list)\n",
    "    .build()\n",
    ")\n",
    "\n",
    "print(\"Number of models to be tested: \", len(param_grid))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Define evaluator, set rmse as loss\n",
    "evaluator = RegressionEvaluator(\n",
    "    metricName=\"rmse\", labelCol=RATING_COL, predictionCol=\"prediction\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Build cross validation using CrossValidator and TrainValidationSplit\n",
    "if model_tuning_method == \"CrossValidator\":\n",
    "    tuner = CrossValidator(\n",
    "        estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5\n",
    "    )\n",
    "elif model_tuning_method == \"TrainValidationSplit\":\n",
    "    tuner = TrainValidationSplit(\n",
    "        estimator=als,\n",
    "        estimatorParamMaps=param_grid,\n",
    "        evaluator=evaluator,\n",
    "        # 80% of the data will be used for training, 20% for validation.\n",
    "        trainRatio=0.8,\n",
    "    )\n",
    "else:\n",
    "    raise ValueError(f\"Unknown model_tuning_method: {model_tuning_method}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "\n",
    "# Train and Extract best model\n",
    "models = tuner.fit(train)\n",
    "model = models.bestModel\n",
    "\n",
    "if model_tuning_method == \"CrossValidator\":\n",
    "    metrics = models.avgMetrics\n",
    "elif model_tuning_method == \"TrainValidationSplit\":\n",
    "    metrics = models.validationMetrics\n",
    "else:\n",
    "    raise ValueError(f\"Unknown model_tuning_method: {model_tuning_method}\")\n",
    "\n",
    "param_maps = models.getEstimatorParamMaps()\n",
    "best_params = param_maps[np.argmin(metrics)]\n",
    "pd_metrics = pd.DataFrame(data={\"Metric\": metrics})\n",
    "\n",
    "print(\"** Best Model **\")\n",
    "for k in best_params:\n",
    "    print(f\"{k.name}: {best_params[k]}\")\n",
    "\n",
    "# collect metrics\n",
    "param_strings = []\n",
    "for param_map in param_maps:\n",
    "    # use split to remove the prefix 'ALS__' in param name\n",
    "    param_strings.append(\n",
    "        \" \".join(f\"{str(k).split('_')[-1]}={v}\" for (k, v) in param_map.items())\n",
    "    )\n",
    "pd_metrics[\"Params\"] = param_strings"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Plot metrics of different submodels\n",
    "f, ax = plt.subplots(figsize=(12, 5))\n",
    "sns.lineplot(x=pd_metrics[\"Params\"], y=pd_metrics[\"Metric\"])\n",
    "plt.ylabel(\"Loss: RMSE\")\n",
    "plt.xlabel(\"Params\")\n",
    "plt.title(\"Loss of SubModels\")\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Model Evaluation\n",
    "\n",
    "We now have the best model, then we can do more evaluations on the test data. \n",
    "If we trained the model well, it should have high metrics on both train and test datasets.\n",
    "If we see only good metrics on train, then the model is overfitted, we may need to increase training data size.\n",
    "If we see bad metrics on both datasets, then the model is not defined well, \n",
    "we may need to change model architecture or at least fine tune hyper parameters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def evaluate(model, data):\n",
    "    \"\"\"\n",
    "    Evaluate the model by computing rmse, mae, r2 and var over the data.\n",
    "    \"\"\"\n",
    "\n",
    "    predictions = model.transform(data).withColumn(\n",
    "        \"prediction\", F.col(\"prediction\").cast(\"double\")\n",
    "    )\n",
    "\n",
    "    # show 10 predictions\n",
    "    predictions.select(\"_user_id\", \"_item_id\", RATING_COL, \"prediction\").limit(\n",
    "        10\n",
    "    ).show()\n",
    "\n",
    "    # initialize the regression evaluator\n",
    "    evaluator = RegressionEvaluator(predictionCol=\"prediction\", labelCol=RATING_COL)\n",
    "\n",
    "    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)\n",
    "    rmse = _evaluator(\"rmse\")\n",
    "    mae = _evaluator(\"mae\")\n",
    "    r2 = _evaluator(\"r2\")\n",
    "    var = _evaluator(\"var\")\n",
    "\n",
    "    print(f\"RMSE score = {rmse}\")\n",
    "    print(f\"MAE score = {mae}\")\n",
    "    print(f\"R2 score = {r2}\")\n",
    "    print(f\"Explained variance = {var}\")\n",
    "\n",
    "    return predictions, (rmse, mae, r2, var)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "Evaluation on trainig data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_ = evaluate(model, train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "Evaluation on test data.\n",
    "\n",
    "If R2 is negative, it means the trained model is actually worse than a horizontal straight line."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "_, (rmse, mae, r2, var) = evaluate(model, test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Log and Load Model with MLFlow\n",
    "Now we get a pretty good model, we can save it for later use. Here we use mlflow to log metrics/models, and load models back for prediction."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# setup mlflow\n",
    "import mlflow\n",
    "import trident.mlflow\n",
    "from trident.mlflow import get_sds_url\n",
    "\n",
    "mlflow.set_tracking_uri(get_sds_url())\n",
    "mlflow.set_registry_uri(get_sds_url())\n",
    "mlflow.set_experiment(EXPERIMENT_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# log model, metrics and params\n",
    "with mlflow.start_run() as run:\n",
    "    print(\"log model:\")\n",
    "    mlflow.spark.log_model(\n",
    "        model,\n",
    "        f\"{EXPERIMENT_NAME}-alsmodel\",\n",
    "        registered_model_name=f\"{EXPERIMENT_NAME}-alsmodel\",\n",
    "        dfs_tmpdir=\"Files/spark\",\n",
    "    )\n",
    "\n",
    "    print(\"log metrics:\")\n",
    "    mlflow.log_metrics({\"RMSE\": rmse, \"MAE\": mae, \"R2\": r2, \"Explained variance\": var})\n",
    "\n",
    "    print(\"log parameters:\")\n",
    "    mlflow.log_params(\n",
    "        {\n",
    "            \"num_epochs\": num_epochs,\n",
    "            \"rank_size_list\": rank_size_list,\n",
    "            \"reg_param_list\": reg_param_list,\n",
    "            \"model_tuning_method\": model_tuning_method,\n",
    "            \"DATA_FOLDER\": DATA_FOLDER,\n",
    "        }\n",
    "    )\n",
    "\n",
    "    model_uri = f\"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-alsmodel\"\n",
    "    print(\"Model saved in run %s\" % run.info.run_id)\n",
    "    print(f\"Model URI: {model_uri}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# load model back\n",
    "# mlflow will use PipelineModel to wrapper the original model, thus here we extract the original ALSModel from the stages.\n",
    "loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir=\"Files/spark\").stages[-1]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "## Step 4. Save Prediction Results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "### Model Deploy and Prediction"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "source": [
    "#### Offline Recommendation\n",
    "Recommend 10 items for each user\n",
    "\n",
    "##### Save offline recommendation results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# Generate top 10 product recommendations for each user\n",
    "userRecs = loaded_model.recommendForAllUsers(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# convert recommendations into interpretable format\n",
    "userRecs = (\n",
    "    userRecs.withColumn(\"rec_exp\", F.explode(\"recommendations\"))\n",
    "    .select(\"_user_id\", F.col(\"rec_exp._item_id\"), F.col(\"rec_exp.rating\"))\n",
    "    .join(df_items.select([\"_item_id\", \"Book-Title\"]), on=\"_item_id\")\n",
    ")\n",
    "userRecs.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "# code for saving userRecs into lakehouse\n",
    "userRecs.write.format(\"delta\").mode(\"overwrite\").save(\n",
    "    f\"{DATA_FOLDER}/predictions/userRecs\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false,
     "source_hidden": false
    },
    "nteract": {
     "transient": {
      "deleting": false
     }
    }
   },
   "outputs": [],
   "source": [
    "print(f\"Full run cost {int(time.time() - ts)} seconds.\")"
   ]
  }
 ],
 "metadata": {
  "kernel_info": {
   "name": "synapse_pyspark"
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  },
  "notebook_environment": {},
  "save_output": true,
  "spark_compute": {
   "compute_id": "/trident/default",
   "session_options": {
    "conf": {
     "spark.dynamicAllocation.enabled": "false",
     "spark.dynamicAllocation.maxExecutors": "2",
     "spark.dynamicAllocation.minExecutors": "2",
     "spark.livy.synapse.ipythonInterpreter.enabled": "true"
    },
    "driverCores": 8,
    "driverMemory": "56g",
    "enableDebugMode": false,
    "executorCores": 8,
    "executorMemory": "56g",
    "keepAliveTimeout": 30,
    "numExecutors": 5
   }
  },
  "synapse_widget": {
   "state": {},
   "version": "0.1"
  },
  "trident": {
   "lakehouse": {
    "default_lakehouse": "",
    "known_lakehouses": [
     {
      "id": ""
     }
    ]
   }
  },
  "vscode": {
   "interpreter": {
    "hash": "8cebba326b76ca708172f0a6a24a89689a3b64f83dbd9353b827f2f4b33d3f80"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
